home *** CD-ROM | disk | FTP | other *** search
/ Windows Game Programming for Dummies (2nd Edition) / WinGamProgFD.iso / pc / DirectX SDK / DXSDK / samples / Multimedia / DirectShow / BaseClasses / outputq.cpp < prev    next >
Encoding:
C/C++ Source or Header  |  2001-10-08  |  23.3 KB  |  789 lines

  1. //------------------------------------------------------------------------------
  2. // File: OutputQ.cpp
  3. //
  4. // Desc: DirectShow base classes - implements COutputQueue class used by an
  5. //       output pin which may sometimes want to queue output samples on a
  6. //       separate thread and sometimes call Receive() directly on the input
  7. //       pin.
  8. //
  9. // Copyright (c) 1992-2001 Microsoft Corporation.  All rights reserved.
  10. //------------------------------------------------------------------------------
  11.  
  12.  
  13. #include <streams.h>
  14.  
  15.  
  16. //
  17. //  COutputQueue Constructor :
  18. //
  19. //  Determines if a thread is to be created and creates resources
  20. //
  21. //     pInputPin  - the downstream input pin we're queueing samples to
  22. //
  23. //     phr        - changed to a failure code if this function fails
  24. //                  (otherwise unchanges)
  25. //
  26. //     bAuto      - Ask pInputPin if it can block in Receive by calling
  27. //                  its ReceiveCanBlock method and create a thread if
  28. //                  it can block, otherwise not.
  29. //
  30. //     bQueue     - if bAuto == FALSE then we create a thread if and only
  31. //                  if bQueue == TRUE
  32. //
  33. //     lBatchSize - work in batches of lBatchSize
  34. //
  35. //     bBatchEact - Use exact batch sizes so don't send until the
  36. //                  batch is full or SendAnyway() is called
  37. //
  38. //     lListSize  - If we create a thread make the list of samples queued
  39. //                  to the thread have this size cache
  40. //
  41. //     dwPriority - If we create a thread set its priority to this
  42. //
  43. COutputQueue::COutputQueue(
  44.              IPin         *pInputPin,          //  Pin to send stuff to
  45.              HRESULT      *phr,                //  'Return code'
  46.              BOOL          bAuto,              //  Ask pin if queue or not
  47.              BOOL          bQueue,             //  Send through queue
  48.              LONG          lBatchSize,         //  Batch
  49.              BOOL          bBatchExact,        //  Batch exactly to BatchSize
  50.              LONG          lListSize,
  51.              DWORD         dwPriority,
  52.              bool          bFlushingOpt        // flushing optimization
  53.             ) : m_lBatchSize(lBatchSize),
  54.                 m_bBatchExact(bBatchExact && (lBatchSize > 1)),
  55.                 m_hThread(NULL),
  56.                 m_hSem(NULL),
  57.                 m_List(NULL),
  58.                 m_pPin(pInputPin),
  59.                 m_ppSamples(NULL),
  60.                 m_lWaiting(0),
  61.                 m_pInputPin(NULL),
  62.                 m_bSendAnyway(FALSE),
  63.                 m_nBatched(0),
  64.                 m_bFlushing(FALSE),
  65.                 m_bFlushed(TRUE),
  66.                 m_bFlushingOpt(bFlushingOpt),
  67.                 m_bTerminate(FALSE),
  68.                 m_hEventPop(NULL),
  69.                 m_hr(S_OK) {
  70.     ASSERT(m_lBatchSize > 0);
  71.  
  72.  
  73.     if(FAILED(*phr)) {
  74.         return;
  75.     }
  76.  
  77.     //  Check the input pin is OK and cache its IMemInputPin interface
  78.  
  79.     *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
  80.     if(FAILED(*phr)) {
  81.         return;
  82.     }
  83.  
  84.     // See if we should ask the downstream pin
  85.  
  86.     if(bAuto) {
  87.         HRESULT hr = m_pInputPin->ReceiveCanBlock();
  88.         if(SUCCEEDED(hr)) {
  89.             bQueue = hr == S_OK;
  90.         }
  91.     }
  92.  
  93.     //  Create our sample batch
  94.  
  95.     m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
  96.     if(m_ppSamples == NULL) {
  97.         *phr = E_OUTOFMEMORY;
  98.         return;
  99.     }
  100.  
  101.     //  If we're queueing allocate resources
  102.  
  103.     if(bQueue) {
  104.         DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
  105.         m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
  106.         if(m_hSem == NULL) {
  107.             DWORD dwError = GetLastError();
  108.             *phr = AmHresultFromWin32(dwError);
  109.             return;
  110.         }
  111.         m_List = new CSampleList(NAME("Sample Queue List"),
  112.             lListSize,
  113.             FALSE         // No lock
  114.             );
  115.         if(m_List == NULL) {
  116.             *phr = E_OUTOFMEMORY;
  117.             return;
  118.         }
  119.  
  120.  
  121.         DWORD dwThreadId;
  122.         m_hThread = CreateThread(NULL,
  123.             0,
  124.             InitialThreadProc,
  125.             (LPVOID)this,
  126.             0,
  127.             &dwThreadId);
  128.         if(m_hThread == NULL) {
  129.             DWORD dwError = GetLastError();
  130.             *phr = AmHresultFromWin32(dwError);
  131.             return;
  132.         }
  133.         SetThreadPriority(m_hThread, dwPriority);
  134.     }
  135.     else {
  136.         DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
  137.     }
  138. }
  139.  
  140. //
  141. //  COutputQueuee Destructor :
  142. //
  143. //  Free all resources -
  144. //
  145. //      Thread,
  146. //      Batched samples
  147. //
  148. COutputQueue::~COutputQueue() {
  149.     DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
  150.     /*  Free our pointer */
  151.     if(m_pInputPin != NULL) {
  152.         m_pInputPin->Release();
  153.     }
  154.     if(m_hThread != NULL) { {
  155.             CAutoLock lck(this);
  156.             m_bTerminate = TRUE;
  157.             m_hr = S_FALSE;
  158.             NotifyThread();
  159.         }
  160.         DbgWaitForSingleObject(m_hThread);
  161.         EXECUTE_ASSERT(CloseHandle(m_hThread));
  162.  
  163.         //  The thread frees the samples when asked to terminate
  164.  
  165.         ASSERT(m_List->GetCount() == 0);
  166.         delete m_List;
  167.     }
  168.     else {
  169.         FreeSamples();
  170.     }
  171.     if(m_hSem != NULL) {
  172.         EXECUTE_ASSERT(CloseHandle(m_hSem));
  173.     }
  174.     delete [] m_ppSamples;
  175. }
  176.  
  177. //
  178. //  Call the real thread proc as a member function
  179. //
  180. DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv) {
  181.     HRESULT hrCoInit = CAMThread::CoInitializeHelper();
  182.  
  183.         COutputQueue *pSampleQueue = (COutputQueue *)pv;
  184.     DWORD dwReturn = pSampleQueue->ThreadProc();
  185.  
  186.     if(hrCoInit == S_OK) {
  187.         CoUninitialize();
  188.     }
  189.  
  190.     return dwReturn;
  191. }
  192.  
  193. //
  194. //  Thread sending the samples downstream :
  195. //
  196. //  When there is nothing to do the thread sets m_lWaiting (while
  197. //  holding the critical section) and then waits for m_hSem to be
  198. //  set (not holding the critical section)
  199. //
  200. DWORD COutputQueue::ThreadProc() {
  201.     while(TRUE) {
  202.         BOOL          bWait = FALSE;
  203.         IMediaSample *pSample=0;
  204.         LONG          lNumberToSend=0; // Local copy
  205.         NewSegmentPacket* ppacket=0;
  206.  
  207.         //
  208.         //  Get a batch of samples and send it if possible
  209.         //  In any case exit the loop if there is a control action
  210.         //  requested
  211.         //
  212.         {
  213.             CAutoLock lck(this);
  214.             while(TRUE) {
  215.  
  216.                 if(m_bTerminate) {
  217.                     FreeSamples();
  218.                     return 0;
  219.                 }
  220.                 if(m_bFlushing) {
  221.                     FreeSamples();
  222.                     SetEvent(m_evFlushComplete);
  223.                 }
  224.  
  225.                 //  Get a sample off the list
  226.  
  227.                 pSample = m_List->RemoveHead();
  228.                 // inform derived class we took something off the queue
  229.                 if(m_hEventPop) {
  230.                     //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  231.                     SetEvent(m_hEventPop);
  232.                 }
  233.  
  234.                 if(pSample != NULL &&
  235.                     !IsSpecialSample(pSample)) {
  236.  
  237.                     //  If its just a regular sample just add it to the batch
  238.                     //  and exit the loop if the batch is full
  239.  
  240.                     m_ppSamples[m_nBatched++] = pSample;
  241.                     if(m_nBatched == m_lBatchSize) {
  242.                         break;
  243.                     }
  244.                 }
  245.                 else {
  246.  
  247.                     //  If there was nothing in the queue and there's nothing
  248.                     //  to send (either because there's nothing or the batch
  249.                     //  isn't full) then prepare to wait
  250.  
  251.                     if(pSample == NULL &&
  252.                         (m_bBatchExact || m_nBatched == 0)) {
  253.  
  254.                         //  Tell other thread to set the event when there's
  255.                         //  something do to
  256.  
  257.                         ASSERT(m_lWaiting == 0);
  258.                         m_lWaiting++;
  259.                         bWait      = TRUE;
  260.                     }
  261.                     else {
  262.  
  263.                         //  We break out of the loop on SEND_PACKET unless
  264.                         //  there's nothing to send
  265.  
  266.                         if(pSample == SEND_PACKET && m_nBatched == 0) {
  267.                             continue;
  268.                         }
  269.  
  270.                         if(pSample == NEW_SEGMENT) {
  271.                             // now we need the parameters - we are
  272.                             // guaranteed that the next packet contains them
  273.                             ppacket = (NewSegmentPacket *) m_List->RemoveHead();
  274.                             // we took something off the queue
  275.                             if(m_hEventPop) {
  276.                                 //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  277.                                 SetEvent(m_hEventPop);
  278.                             }
  279.  
  280.                             ASSERT(ppacket);
  281.                         }
  282.                         //  EOS_PACKET falls through here and we exit the loop
  283.                         //  In this way it acts like SEND_PACKET
  284.                     }
  285.                     break;
  286.                 }
  287.             }
  288.             if(!bWait) {
  289.                 // We look at m_nBatched from the client side so keep
  290.                 // it up to date inside the critical section
  291.                 lNumberToSend = m_nBatched;  // Local copy
  292.                 m_nBatched = 0;
  293.             }
  294.         }
  295.  
  296.         //  Wait for some more data
  297.  
  298.         if(bWait) {
  299.             DbgWaitForSingleObject(m_hSem);
  300.             continue;
  301.         }
  302.  
  303.  
  304.  
  305.         //  OK - send it if there's anything to send
  306.         //  We DON'T check m_bBatchExact here because either we've got
  307.         //  a full batch or we dropped through because we got
  308.         //  SEND_PACKET or EOS_PACKET - both of which imply we should
  309.         //  flush our batch
  310.  
  311.         if(lNumberToSend != 0) {
  312.             long nProcessed;
  313.             if(m_hr == S_OK) {
  314.                 ASSERT(!m_bFlushed);
  315.                 HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  316.                     lNumberToSend,
  317.                     &nProcessed);
  318.                 /*  Don't overwrite a flushing state HRESULT */
  319.                 CAutoLock lck(this);
  320.                 if(m_hr == S_OK) {
  321.                     m_hr = hr;
  322.                 }
  323.                 ASSERT(!m_bFlushed);
  324.             }
  325.             while(lNumberToSend != 0) {
  326.                 m_ppSamples[--lNumberToSend]->Release();
  327.             }
  328.             if(m_hr != S_OK) {
  329.  
  330.                 //  In any case wait for more data - S_OK just
  331.                 //  means there wasn't an error
  332.  
  333.                 DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
  334.                     m_hr));
  335.             }
  336.         }
  337.  
  338.         //  Check for end of stream
  339.  
  340.         if(pSample == EOS_PACKET) {
  341.  
  342.             //  We don't send even end of stream on if we've previously
  343.             //  returned something other than S_OK
  344.             //  This is because in that case the pin which returned
  345.             //  something other than S_OK should have either sent
  346.             //  EndOfStream() or notified the filter graph
  347.  
  348.             if(m_hr == S_OK) {
  349.                 DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  350.                 HRESULT hr = m_pPin->EndOfStream();
  351.                 if(FAILED(hr)) {
  352.                     DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  353.                 }
  354.             }
  355.         }
  356.  
  357.         //  Data from a new source
  358.  
  359.         if(pSample == RESET_PACKET) {
  360.             m_hr = S_OK;
  361.             SetEvent(m_evFlushComplete);
  362.         }
  363.  
  364.         if(pSample == NEW_SEGMENT) {
  365.             m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
  366.             delete ppacket;
  367.         }
  368.     }
  369. }
  370.  
  371. //  Send batched stuff anyway
  372. void COutputQueue::SendAnyway() {
  373.     if(!IsQueued()) {
  374.  
  375.         //  m_bSendAnyway is a private parameter checked in ReceiveMultiple
  376.  
  377.         m_bSendAnyway = TRUE;
  378.         LONG nProcessed;
  379.         ReceiveMultiple(NULL, 0, &nProcessed);
  380.         m_bSendAnyway = FALSE;
  381.  
  382.     }
  383.     else {
  384.         CAutoLock lck(this);
  385.         QueueSample(SEND_PACKET);
  386.         NotifyThread();
  387.     }
  388. }
  389.  
  390. void
  391. COutputQueue::NewSegment(
  392.     REFERENCE_TIME tStart,
  393.     REFERENCE_TIME tStop,
  394.     double dRate) {
  395.     if(!IsQueued()) {
  396.         if(S_OK == m_hr) {
  397.             if(m_bBatchExact) {
  398.                 SendAnyway();
  399.             }
  400.             m_pPin->NewSegment(tStart, tStop, dRate);
  401.         }
  402.     }
  403.     else {
  404.         if(m_hr == S_OK) {
  405.             //
  406.             // we need to queue the new segment to appear in order in the
  407.             // data, but we need to pass parameters to it. Rather than
  408.             // take the hit of wrapping every single sample so we can tell
  409.             // special ones apart, we queue special pointers to indicate
  410.             // special packets, and we guarantee (by holding the
  411.             // critical section) that the packet immediately following a
  412.             // NEW_SEGMENT value is a NewSegmentPacket containing the
  413.             // parameters.
  414.             NewSegmentPacket * ppack = new NewSegmentPacket;
  415.             if(ppack == NULL) {
  416.                 return;
  417.             }
  418.             ppack->tStart = tStart;
  419.             ppack->tStop = tStop;
  420.             ppack->dRate = dRate;
  421.  
  422.             CAutoLock lck(this);
  423.             QueueSample(NEW_SEGMENT);
  424.             QueueSample((IMediaSample*) ppack);
  425.             NotifyThread();
  426.         }
  427.     }
  428. }
  429.  
  430.  
  431. //
  432. //  End of Stream is queued to output device
  433. //
  434. void COutputQueue::EOS() {
  435.     CAutoLock lck(this);
  436.     if(!IsQueued()) {
  437.         if(m_bBatchExact) {
  438.             SendAnyway();
  439.         }
  440.         if(m_hr == S_OK) {
  441.             DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  442.             m_bFlushed = FALSE;
  443.             HRESULT hr = m_pPin->EndOfStream();
  444.             if(FAILED(hr)) {
  445.                 DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  446.             }
  447.         }
  448.     }
  449.     else {
  450.         if(m_hr == S_OK) {
  451.             m_bFlushed = FALSE;
  452.             QueueSample(EOS_PACKET);
  453.             NotifyThread();
  454.         }
  455.     }
  456. }
  457.  
  458. //
  459. //  Flush all the samples in the queue
  460. //
  461. void COutputQueue::BeginFlush() {
  462.     if(IsQueued()) { {
  463.             CAutoLock lck(this);
  464.  
  465.             // block receives -- we assume this is done by the
  466.             // filter in which we are a component
  467.  
  468.             // discard all queued data
  469.  
  470.             m_bFlushing = TRUE;
  471.  
  472.             //  Make sure we discard all samples from now on
  473.  
  474.             if(m_hr == S_OK) {
  475.                 m_hr = S_FALSE;
  476.             }
  477.  
  478.             // Optimize so we don't keep calling downstream all the time
  479.  
  480.             if(m_bFlushed && m_bFlushingOpt) {
  481.                 return;
  482.             }
  483.  
  484.             // Make sure we really wait for the flush to complete
  485.             m_evFlushComplete.Reset();
  486.  
  487.             NotifyThread();
  488.         }
  489.  
  490.         // pass this downstream
  491.  
  492.         m_pPin->BeginFlush();
  493.     }
  494.     else {
  495.         // pass downstream first to avoid deadlocks
  496.         m_pPin->BeginFlush();
  497.         CAutoLock lck(this);
  498.         // discard all queued data
  499.  
  500.         m_bFlushing = TRUE;
  501.  
  502.         //  Make sure we discard all samples from now on
  503.  
  504.         if(m_hr == S_OK) {
  505.             m_hr = S_FALSE;
  506.         }
  507.     }
  508.  
  509. }
  510.  
  511. //
  512. // leave flush mode - pass this downstream
  513. void COutputQueue::EndFlush() { {
  514.         CAutoLock lck(this);
  515.         ASSERT(m_bFlushing);
  516.         if(m_bFlushingOpt && m_bFlushed && IsQueued()) {
  517.             m_bFlushing = FALSE;
  518.             m_hr = S_OK;
  519.             return;
  520.         }
  521.     }
  522.  
  523.     // sync with pushing thread -- done in BeginFlush
  524.     // ensure no more data to go downstream -- done in BeginFlush
  525.     //
  526.     // Because we are synching here there is no need to hold the critical
  527.     // section (in fact we'd deadlock if we did!)
  528.  
  529.     if(IsQueued()) {
  530.         m_evFlushComplete.Wait();
  531.     }
  532.     else {
  533.         FreeSamples();
  534.     }
  535.  
  536.     //  Be daring - the caller has guaranteed no samples will arrive
  537.     //  before EndFlush() returns
  538.  
  539.     m_bFlushing = FALSE;
  540.     m_bFlushed  = TRUE;
  541.  
  542.     // call EndFlush on downstream pins
  543.  
  544.     m_pPin->EndFlush();
  545.  
  546.     m_hr = S_OK;
  547. }
  548.  
  549. //  COutputQueue::QueueSample
  550. //
  551. //  private method to Send a sample to the output queue
  552. //  The critical section MUST be held when this is called
  553.  
  554. void COutputQueue::QueueSample(IMediaSample *pSample) {
  555.     if(NULL == m_List->AddTail(pSample)) {
  556.         if(!IsSpecialSample(pSample)) {
  557.             pSample->Release();
  558.         }
  559.     }
  560. }
  561.  
  562. //
  563. //  COutputQueue::Receive()
  564. //
  565. //  Send a single sample by the multiple sample route
  566. //  (NOTE - this could be optimized if necessary)
  567. //
  568. //  On return the sample will have been Release()'d
  569. //
  570.  
  571. HRESULT COutputQueue::Receive(IMediaSample *pSample) {
  572.     LONG nProcessed;
  573.     return ReceiveMultiple(&pSample, 1, &nProcessed);
  574. }
  575.  
  576. //
  577. //  COutputQueue::ReceiveMultiple()
  578. //
  579. //  Send a set of samples to the downstream pin
  580. //
  581. //      ppSamples           - array of samples
  582. //      nSamples            - how many
  583. //      nSamplesProcessed   - How many were processed
  584. //
  585. //  On return all samples will have been Release()'d
  586. //
  587.  
  588. HRESULT COutputQueue::ReceiveMultiple(
  589.     IMediaSample **ppSamples,
  590.     long nSamples,
  591.     long *nSamplesProcessed) {
  592.     CAutoLock lck(this);
  593.     //  Either call directly or queue up the samples
  594.  
  595.     if(!IsQueued()) {
  596.  
  597.         //  If we already had a bad return code then just return
  598.  
  599.         if(S_OK != m_hr) {
  600.  
  601.             //  If we've never received anything since the last Flush()
  602.             //  and the sticky return code is not S_OK we must be
  603.             //  flushing
  604.             //  ((!A || B) is equivalent to A implies B)
  605.             ASSERT(!m_bFlushed || m_bFlushing);
  606.  
  607.             //  We're supposed to Release() them anyway!
  608.             *nSamplesProcessed = 0;
  609.             for(int i = 0; i < nSamples; i++) {
  610.                 DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
  611.                     nSamples, m_hr));
  612.                 ppSamples[i]->Release();
  613.             }
  614.  
  615.             return m_hr;
  616.         }
  617.         //
  618.         //  If we're flushing the sticky return code should be S_FALSE
  619.         //
  620.         ASSERT(!m_bFlushing);
  621.         m_bFlushed = FALSE;
  622.  
  623.         ASSERT(m_nBatched < m_lBatchSize);
  624.         ASSERT(m_nBatched == 0 || m_bBatchExact);
  625.  
  626.         //  Loop processing the samples in batches
  627.  
  628.         LONG iLost = 0;
  629.         for(long iDone = 0;
  630.             iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);) {
  631.  
  632.             //pragma message (REMIND("Implement threshold scheme"))
  633.             ASSERT(m_nBatched < m_lBatchSize);
  634.             if(iDone < nSamples) {
  635.                 m_ppSamples[m_nBatched++] = ppSamples[iDone++];
  636.             }
  637.             if(m_nBatched == m_lBatchSize ||
  638.                 nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
  639.                 LONG nDone;
  640.                 DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
  641.                     m_nBatched));
  642.  
  643.                 if(m_hr == S_OK) {
  644.                     m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  645.                         m_nBatched,
  646.                         &nDone);
  647.                 }
  648.                 else {
  649.                     nDone = 0;
  650.                 }
  651.                 iLost += m_nBatched - nDone;
  652.                 for(LONG i = 0; i < m_nBatched; i++) {
  653.                     m_ppSamples[i]->Release();
  654.                 }
  655.                 m_nBatched = 0;
  656.             }
  657.         }
  658.         *nSamplesProcessed = iDone - iLost;
  659.         if(*nSamplesProcessed < 0) {
  660.             *nSamplesProcessed = 0;
  661.         }
  662.         return m_hr;
  663.     }
  664.     else {
  665.         /*  We're sending to our thread */
  666.  
  667.         if(m_hr != S_OK) {
  668.             *nSamplesProcessed = 0;
  669.             DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
  670.                 nSamples, m_hr));
  671.             for(int i = 0; i < nSamples; i++) {
  672.                 ppSamples[i]->Release();
  673.             }
  674.             return m_hr;
  675.         }
  676.         m_bFlushed = FALSE;
  677.         for(long i = 0; i < nSamples; i++) {
  678.             QueueSample(ppSamples[i]);
  679.         }
  680.         *nSamplesProcessed = nSamples;
  681.         if(!m_bBatchExact ||
  682.             m_nBatched + m_List->GetCount() >= m_lBatchSize) {
  683.             NotifyThread();
  684.         }
  685.     }
  686.  
  687.     return S_OK;
  688. }
  689.  
  690. //  Get ready for new data - cancels sticky m_hr
  691. void COutputQueue::Reset() {
  692.     if(!IsQueued()) {
  693.         m_hr = S_OK;
  694.     }
  695.     else {
  696.         CAutoLock lck(this);
  697.         QueueSample(RESET_PACKET);
  698.         NotifyThread();
  699.         m_evFlushComplete.Wait();
  700.     }
  701. }
  702.  
  703. //  Remove and Release() all queued and Batched samples
  704. void COutputQueue::FreeSamples() {
  705.     CAutoLock lck(this);
  706.     if(IsQueued()) {
  707.         while(TRUE) {
  708.             IMediaSample *pSample = m_List->RemoveHead();
  709.             // inform derived class we took something off the queue
  710.             if(m_hEventPop) {
  711.                 //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  712.                 SetEvent(m_hEventPop);
  713.             }
  714.  
  715.             if(pSample == NULL) {
  716.                 break;
  717.             }
  718.             if(!IsSpecialSample(pSample)) {
  719.                 pSample->Release();
  720.             }
  721.             else {
  722.                 if(pSample == NEW_SEGMENT) {
  723.                     //  Free NEW_SEGMENT packet
  724.                     NewSegmentPacket *ppacket =
  725.                         (NewSegmentPacket *) m_List->RemoveHead();
  726.                     // inform derived class we took something off the queue
  727.                     if(m_hEventPop) {
  728.                         //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  729.                         SetEvent(m_hEventPop);
  730.                     }
  731.  
  732.                     ASSERT(ppacket != NULL);
  733.                     delete ppacket;
  734.                 }
  735.             }
  736.         }
  737.     }
  738.     for(int i = 0; i < m_nBatched; i++) {
  739.         m_ppSamples[i]->Release();
  740.     }
  741.     m_nBatched = 0;
  742. }
  743.  
  744. //  Notify the thread if there is something to do
  745. //
  746. //  The critical section MUST be held when this is called
  747. void COutputQueue::NotifyThread() {
  748.     //  Optimize - no need to signal if it's not waiting
  749.     ASSERT(IsQueued());
  750.     if(m_lWaiting) {
  751.         ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
  752.         m_lWaiting = 0;
  753.     }
  754. }
  755.  
  756. //  See if there's any work to do
  757. //  Returns
  758. //      TRUE  if there is nothing on the queue and nothing in the batch
  759. //            and all data has been sent
  760. //      FALSE otherwise
  761. //
  762. BOOL COutputQueue::IsIdle() {
  763.     CAutoLock lck(this);
  764.  
  765.     //  We're idle if
  766.     //      there is no thread (!IsQueued()) OR
  767.     //      the thread is waiting for more work  (m_lWaiting != 0)
  768.     //  AND
  769.     //      there's nothing in the current batch (m_nBatched == 0)
  770.  
  771.     if(IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
  772.         return FALSE;
  773.     }
  774.     else {
  775.  
  776.         //  If we're idle it shouldn't be possible for there
  777.         //  to be anything on the work queue
  778.         ASSERT(!IsQueued() || m_List->GetCount() == 0);
  779.     }
  780.  
  781.     return TRUE;
  782. }
  783.  
  784.  
  785. void COutputQueue::SetPopEvent(HANDLE hEvent) {
  786.     m_hEventPop = hEvent;
  787. }
  788.  
  789.